This is a short and simple tutorial of SparkR. We introduce SparkR and we show few simple examples.
More information you can find on the following pages.
R is a programming language that is very popular among Data Scientist (better Computer Scientist than an average Statistician and better Statistician than an average Computer Scientist).
Framework for cluster computing (you can use with Java, Scala, Python,…)
(spark+R): Framework for cluster computing using R
Lets set variable that store location of spark,
## SPARK_HOME <- Sys.getenv("SPARK_HOME")
SPARK_HOME <- "/home/bartek/programs/spark-1.5.2-bin-hadoop2.6/"
## SPARK_HOME <- "/Users/CotePelaez/Documents/spark-1.5.2-bin-hadoop2.6/"
.libPaths(c(.libPaths(), file.path(SPARK_HOME,"R/lib/")))
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.2.0" "sparkr-shell"')
library(SparkR)
##
## Attaching package: 'SparkR'
##
## The following objects are masked from 'package:stats':
##
## filter, na.omit
##
## The following objects are masked from 'package:base':
##
## intersect, rbind, sample, subset, summary, table, transform
library(rJava)
library(ggplot2)
library(pipeR)
library(whisker)
For educational reason Spark allows to be run on one local machine. We get that by assigning master to local machine.
sc <- sparkR.init(master = "local", sparkHome = SPARK_HOME)
## Launching java with spark-submit command /home/bartek/programs/spark-1.5.2-bin-hadoop2.6//bin/spark-submit "--packages" "com.databricks:spark-csv_2.10:1.2.0" "sparkr-shell" /tmp/RtmpUbpn6I/backend_port351e18f6f960
hiveContext <- sparkRHive.init(sc)
Now, we can get access to Spark UI at http://localhost:4040
DataFrame is Spark object that allows to do the computations on distributed storages directly in R. We can create DataFrame object from standard data.frame as follows.
class(mtcars)
## [1] "data.frame"
##df_mtcars <- createDataFrame(sqlContext, mtcars)
df_mtcars <- createDataFrame(hiveContext, mtcars)
class(df_mtcars)
## [1] "DataFrame"
## attr(,"package")
## [1] "SparkR"
df_mtcars
## DataFrame[mpg:double, cyl:double, disp:double, hp:double, drat:double, wt:double, qsec:double, vs:double, am:double, gear:double, carb:double]
head(df_mtcars)
## mpg cyl disp hp drat wt qsec vs am gear carb
## 1 21.0 6 160 110 3.90 2.620 16.46 0 1 4 4
## 2 21.0 6 160 110 3.90 2.875 17.02 0 1 4 4
## 3 22.8 4 108 93 3.85 2.320 18.61 1 1 4 1
## 4 21.4 6 258 110 3.08 3.215 19.44 1 0 3 1
## 5 18.7 8 360 175 3.15 3.440 17.02 0 0 3 2
## 6 18.1 6 225 105 2.76 3.460 20.22 1 0 3 1
Note that this DataFrame object is not in the workspace of R; it is enterily in Spark.
count(filter(df_mtcars, "cyl = 6"))
## [1] 7
df_mtcars %>>%
filter("cyl = 6") %>>%
count
## [1] 7
df_mtcars %>>%
registerTempTable("mtcars")
hiveContext %>>%
tables %>>%
collect
## tableName isTemporary
## 1 mtcars TRUE
sql(hiveContext, "select count(*) from mtcars where cyl = 6") %>>% collect
## _c0
## 1 7
Spark does not execute calculations until we ask for results. Example:
path1 <- "data/train.csv"
df1 <- read.df(hiveContext, path1,
source = "com.databricks.spark.csv",
header="true",
inferSchema = "true")
head(df1)
## Store DayOfWeek Date Sales Customers Open Promo StateHoliday
## 1 1 5 2015-07-31 5263 555 1 1 0
## 2 2 5 2015-07-31 6064 625 1 1 0
## 3 3 5 2015-07-31 8314 821 1 1 0
## 4 4 5 2015-07-31 13995 1498 1 1 0
## 5 5 5 2015-07-31 4822 559 1 1 0
## 6 6 5 2015-07-31 5651 589 1 1 0
## SchoolHoliday
## 1 1
## 2 1
## 3 1
## 4 1
## 5 1
## 6 1
p <- proc.time()
df1_store3 <-
df1 %>>%
groupBy("Date") %>>%
agg(Sales="sum")
proc.time()-p
## user system elapsed
## 0.004 0.000 0.017
p <- proc.time()
df1_store3 %>>%
limit(10) %>>%
collect
## Date sum(Sales)
## 1 2015-07-14 9340405
## 2 2015-04-08 6790649
## 3 2014-01-20 10648819
## 4 2013-12-10 7458243
## 5 2013-08-06 6366382
## 6 2015-07-15 8756137
## 7 2015-04-09 6560475
## 8 2014-01-21 8475207
## 9 2013-12-11 7364610
## 10 2013-08-07 6414477
proc.time()-p
## user system elapsed
## 0.020 0.000 5.926
p <- proc.time()
df1_store3 %>>%
count
## [1] 942
proc.time()-p
## user system elapsed
## 0.004 0.000 7.368
df1_store3_cache <-
df1_store3 %>>%
cache
p <- proc.time()
df1_store3_cache %>>%
limit(10) %>>%
collect
## Date sum(Sales)
## 1 2015-07-14 9340405
## 2 2015-04-08 6790649
## 3 2014-01-20 10648819
## 4 2013-12-10 7458243
## 5 2013-08-06 6366382
## 6 2015-07-15 8756137
## 7 2015-04-09 6560475
## 8 2014-01-21 8475207
## 9 2013-12-11 7364610
## 10 2013-08-07 6414477
proc.time()-p
## user system elapsed
## 0.008 0.000 4.972
p <- proc.time()
df1_store3_cache %>>%
count
## [1] 942
proc.time()-p
## user system elapsed
## 0.000 0.000 2.897
# eliminate if you dont need more
df1_store3_cache %>>% unpersist()
## DataFrame[Date:string, sum(Sales):bigint]
Now its time to meet few SparkR functions. There are more of them, and they can be find in SparkR documentation.
https://spark.apache.org/docs/latest/api/R/index.html
describefilterselectdistinctmutate (withColumn)collectjoinFirst we load our datasets.
states_properties_path <- "data/states_properties.csv"
states_division_path <- "data/states_division.csv"
sp_df <- read.df(hiveContext, states_properties_path,
source = "com.databricks.spark.csv",
header="true",
inferSchema = "true")
sd_df <- read.df(hiveContext, states_division_path,
source = "com.databricks.spark.csv",
header="true",
inferSchema = "true")
sp_df %>>%
describe %>>%
collect
## summary c("50", NA, NA, "Alabama", "Wyoming") state_abb
## 1 count 50 50
## 2 mean <NA> <NA>
## 3 stddev <NA> <NA>
## 4 min Alabama AK
## 5 max Wyoming WY
## Population Income Illiteracy
## 1 50 50 50
## 2 4246.42 4435.8 1.1700000000000002
## 3 4419.621033934924 608.2942051343247 0.6034069936618229
## 4 365 3098 0.5
## 5 21198 6315 2.8
## Life_Exp Murder HS_Grad Frost
## 1 50 50 50 50
## 2 70.8786 7.377999999999998 53.10800000000002 104.46
## 3 1.3289018172904592 3.654437850066686 7.995819907926756 51.45841427793905
## 4 67.96 1.4 37.8 0
## 5 73.6 15.1 67.3 188
## Area
## 1 50
## 2 70735.88
## 3 NaN
## 4 1049
## 5 566432
sd_df %>>%
select(sd_df$state_division) %>>%
distinct %>>%
count
## [1] 9
sp_df %>>%
mutate(Area_km2= (.$Area * 2.58999)) %>>%
head
## c("Alabama", "Alaska", "Arizona", "Arkansas", "California", "Colorado"
## 1 Alabama
## 2 Alaska
## 3 Arizona
## 4 Arkansas
## 5 California
## 6 Colorado
## state_abb Population Income Illiteracy Life_Exp Murder HS_Grad Frost
## 1 AL 3615 3624 2.1 69.05 15.1 41.3 20
## 2 AK 365 6315 1.5 69.31 11.3 66.7 152
## 3 AZ 2212 4530 1.8 70.55 7.8 58.1 15
## 4 AR 2110 3378 1.9 70.66 10.1 39.9 65
## 5 CA 21198 5114 1.1 71.71 10.3 62.6 20
## 6 CO 2541 4884 0.7 72.06 6.8 63.9 166
## Area Area_km2
## 1 50708 131333.2
## 2 566432 1467053.2
## 3 113417 293748.9
## 4 51945 134537.0
## 5 156361 404973.4
## 6 103766 268752.9
sp_df %>>%
head
## c("Alabama", "Alaska", "Arizona", "Arkansas", "California", "Colorado"
## 1 Alabama
## 2 Alaska
## 3 Arizona
## 4 Arkansas
## 5 California
## 6 Colorado
## state_abb Population Income Illiteracy Life_Exp Murder HS_Grad Frost
## 1 AL 3615 3624 2.1 69.05 15.1 41.3 20
## 2 AK 365 6315 1.5 69.31 11.3 66.7 152
## 3 AZ 2212 4530 1.8 70.55 7.8 58.1 15
## 4 AR 2110 3378 1.9 70.66 10.1 39.9 65
## 5 CA 21198 5114 1.1 71.71 10.3 62.6 20
## 6 CO 2541 4884 0.7 72.06 6.8 63.9 166
## Area
## 1 50708
## 2 566432
## 3 113417
## 4 51945
## 5 156361
## 6 103766
s_df <-
sp_df %>>%
mutate(Area_km2= (.$Area * 2.58999)) %>>%
join(sd_df, .$state_abb == sd_df$state_abb)
head(s_df)
## c("Mississippi", "Montana", "Tennessee", "North Carolina", "North Dakota",
## 1 Mississippi
## 2 Montana
## 3 Tennessee
## 4 North Carolina
## 5 North Dakota
## 6 Nebraska
## state_abb Population Income Illiteracy Life_Exp Murder HS_Grad Frost
## 1 MS 2341 3098 2.4 68.09 12.5 41.0 50
## 2 MT 746 4347 0.6 70.56 5.0 59.2 155
## 3 TN 4173 3821 1.7 70.11 11.0 41.8 70
## 4 NC 5441 3875 1.8 69.21 11.1 38.5 80
## 5 ND 637 5087 0.8 72.78 1.4 50.3 186
## 6 NE 1544 4508 0.6 72.60 2.9 59.3 139
## Area Area_km2 c(24L, 26L, 42L, 33L, 34L, 27L) state_abb
## 1 47296 122496.2 24 MS
## 2 145587 377068.9 26 MT
## 3 41328 107039.1 42 TN
## 4 48798 126386.3 33 NC
## 5 69273 179416.4 34 ND
## 6 76483 198090.2 27 NE
## state_division
## 1 East South Central
## 2 Mountain
## 3 East South Central
## 4 South Atlantic
## 5 West North Central
## 6 West North Central
d_df <-
s_df %>>%
groupBy("state_division") %>>%
agg(max_income=max(s_df$income), area=sum(s_df$Area))
collect(d_df)
## state_division max_income area
## 1 Middle Atlantic 5237 100318
## 2 East South Central 3821 178982
## 3 East North Central 5107 244101
## 4 South Atlantic 5299 266909
## 5 Pacific 6315 891972
## 6 Mountain 5149 856047
## 7 West South Central 4188 427791
## 8 West North Central 5087 507723
## 9 New England 5348 62951
s_df %>>%
groupBy("state_division") %>>%
avg("Area", "Income") %>>%
collect()
## state_division avg(Area) avg(Income)
## 1 Middle Atlantic 33439.33 4863.000
## 2 East South Central 44745.50 3563.750
## 3 East North Central 48820.20 4669.000
## 4 South Atlantic 33363.62 4355.250
## 5 Pacific 178394.40 5183.200
## 6 Mountain 107005.88 4402.250
## 7 West South Central 106947.75 3773.500
## 8 West North Central 72531.86 4569.714
## 9 New England 10491.83 4423.833
registerTempTable(s_df, "s_table")
s2_df <-
sql(hiveContext,
"SELECT state_division, Population, Income * Population AS total_income FROM s_table")
s2_df %>>%
groupBy(.$state_division) %>>%
agg(total_income=sum(s2_df$total_income)) %>>%
collect
## state_division total_income
## 1 Middle Atlantic 179794689
## 2 East South Central 48870755
## 3 East North Central 193620781
## 4 South Atlantic 145896064
## 5 Pacific 142973847
## 6 Mountain 42734949
## 7 West South Central 82682251
## 8 West North Central 75534019
## 9 New England 57696396
path <- "data/subdata/test.csv"
data <- read.df(hiveContext, path,
source = "com.databricks.spark.csv",
header="true",
inferSchema = "true")
count(data)
## [1] 41088
path <- "data/subdata/"
data <- read.df(hiveContext, path,
source = "com.databricks.spark.csv",
header="true",
inferSchema = "true")
count(data)
## [1] 164352
sqlContext <- sparkRSQL.init(sc)
train_df <-
sqlContext %>>%
read.df("data/trainParquet")
head(train_df)
## Store DayOfWeek Date Sales Customers Open Promo StateHoliday
## 1 1 5 2015-07-31 5263 555 1 1 0
## 2 2 5 2015-07-31 6064 625 1 1 0
## 3 3 5 2015-07-31 8314 821 1 1 0
## 4 4 5 2015-07-31 13995 1498 1 1 0
## 5 5 5 2015-07-31 4822 559 1 1 0
## 6 6 5 2015-07-31 5651 589 1 1 0
## SchoolHoliday
## 1 1
## 2 1
## 3 1
## 4 1
## 5 1
## 6 1
printSchema(train_df)
## root
## |-- Store: string (nullable = true)
## |-- DayOfWeek: string (nullable = true)
## |-- Date: string (nullable = true)
## |-- Sales: string (nullable = true)
## |-- Customers: string (nullable = true)
## |-- Open: string (nullable = true)
## |-- Promo: string (nullable = true)
## |-- StateHoliday: string (nullable = true)
## |-- SchoolHoliday: string (nullable = true)
test_df <-
sqlContext %>>%
read.df("data/testParquet")
printSchema(test_df)
## root
## |-- Id: string (nullable = true)
## |-- Store: string (nullable = true)
## |-- DayOfWeek: string (nullable = true)
## |-- Date: string (nullable = true)
## |-- Open: string (nullable = true)
## |-- Promo: string (nullable = true)
## |-- StateHoliday: string (nullable = true)
## |-- SchoolHoliday: string (nullable = true)
train_df %>>%
select("Store") %>>% distinct %>>% count
## [1] 1115
train_df %>>%
filter(.$Date <= '2014-07-31' & .$Date >= '2014-01-01') %>>%
#filter("Date <= '2014-07-31' AND Date >= '2014-01-01'") %>>%
groupBy("Date") %>>%
agg(Sales=sum(train_df$Sales)) %>>%
collect %>>%
ggplot() +
geom_bar(aes(as.Date(Date), Sales), stat="identity") +
xlab("Date") +
theme_bw()
train_df %>>%
filter("Promo = 1") %>>%
groupBy("DayOfWeek") %>>%
agg(Promo=sum(train_df$Promo)) %>>%
orderBy(desc(.$Promo)) %>>%
collect
## DayOfWeek Promo
## 1 1 77760
## 2 2 77580
## 3 3 77580
## 4 4 77580
## 5 5 77580
train_df %>>%
registerTempTable("train")
query <- "SELECT COUNT(*) FROM train GROUP BY Date"
sql(sqlContext, query) %>>%
head
## _c0
## 1 1115
## 2 1115
## 3 1115
## 4 1115
## 5 1115
## 6 1115
query_template <-
"SELECT COUNT(*) FROM train GROUP BY {{{col}}}"
data <- list( col="Date")
query <- whisker.render(query_template, data)
sql(sqlContext, query) %>>%
head
## _c0
## 1 1115
## 2 1115
## 3 1115
## 4 1115
## 5 1115
## 6 1115
function() {
library(MASS)
boston_df <- createDataFrame(hiveContext, Boston)
lm.fit <-
glm(medv~lstat,
data=boston_df,
family = "gaussian")
summary(lm.fit)
intercept <- summary(lm.fit)$coefficients[1]
slope <- summary(lm.fit)$coefficients[2]
boston_df %>>%
sample(FALSE, 0.1) %>>%
collect %>>%
ggplot() + geom_point(aes(lstat, medv)) + geom_abline(intercept=intercept, slope=slope)
lm.fit=glm(medv~lstat+age, data=boston_df, family = "gaussian")
summary(lm.fit)
#library(ISLR)
#Smarket_df <- createDataFrame(hiveContext, Smarket)
#glm.fit <- glm(Direction~Lag1 + Lag2 + Lag3 + Lag4 + Lag5 + Volume,
# data=Smarket_df, family = "binomial" )
}()
## function() {
## library(MASS)
##
## boston_df <- createDataFrame(hiveContext, Boston)
## lm.fit <-
## glm(medv~lstat,
## data=boston_df,
## family = "gaussian")
##
## summary(lm.fit)
##
## intercept <- summary(lm.fit)$coefficients[1]
## slope <- summary(lm.fit)$coefficients[2]
##
## boston_df %>>%
## sample(FALSE, 0.1) %>>%
## collect %>>%
## ggplot() + geom_point(aes(lstat, medv)) + geom_abline(intercept=intercept, slope=slope)
##
##
## lm.fit=glm(medv~lstat+age, data=boston_df, family = "gaussian")
##
## summary(lm.fit)
##
## #library(ISLR)
## #Smarket_df <- createDataFrame(hiveContext, Smarket)
## #glm.fit <- glm(Direction~Lag1 + Lag2 + Lag3 + Lag4 + Lag5 + Volume,
## # data=Smarket_df, family = "binomial" )
## }()